package com.longge.bigfile.service.impl;

import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.DefaultTuple;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisZSetCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.web.multipart.MultipartFile;

import com.longge.bigfile.config.BigfileConfig;
import com.longge.bigfile.constant.ErrorConstants;
import com.longge.bigfile.dto.request.BaseDto;
import com.longge.bigfile.dto.request.PostUploadRequestDto;
import com.longge.bigfile.dto.request.PreUploadRequestDto;
import com.longge.bigfile.dto.response.UploadResponseDto;
import com.longge.bigfile.service.ProcessService;
import com.longge.bigfile.util.FileUtils;
import com.longge.bigfile.util.RedisKeyUtils;
import com.longge.bigfile.util.SliceUtils;
import com.longge.common.dto.GlobalResponse;
import com.longge.common.util.DateUtils;
import com.longge.common.util.RedisUtils;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;

/**
 * ProcessService的抽象实现，将不同云存储的不同部分进行再抽象，相同的部分重用代码
 * 
 * @author yangzhilong
 *
 */
@Slf4j
public abstract class AbstractProcessServiceImpl implements ProcessService {
	@Autowired
    private RedissonClient redissonClient;
    @Autowired
    private BigfileConfig bigfileConfig;
    
    @Override
	public Mono<GlobalResponse<UploadResponseDto>> preUpload(PreUploadRequestDto dto) {
    	RLock lock = redissonClient.getLock(RedisKeyUtils.getFileLock(dto));
        boolean lockFlag = false;
        try {
            lockFlag = lock.tryLock(5, bigfileConfig.getFileLockTimeOutSecond(), TimeUnit.SECONDS);
            if(!lockFlag) {
                log.warn("Another person is uploading， MD5 is :{}", dto.getMd5());
                return Mono.just(GlobalResponse.fail(ErrorConstants.FILE_IS_UPLOADING_ERROR));
            }
            
            String hashRedisKey = RedisKeyUtils.getBigFileInfoHashKey(dto);
            // if hasKey, this file is not first upload
            if (RedisUtils.hasKey(hashRedisKey)) {
                return Mono.just(secondPreUpload(dto, hashRedisKey));
            }
            setThreadLocal(dto);
            
            return Mono.just(initPreUpload(dto, hashRedisKey));
        } catch (InterruptedException e) {
            log.error("redis lock is interrupted", e);
            return Mono.just(GlobalResponse.fail(ErrorConstants.FILE_IS_UPLOADING_ERROR));
        } finally {
            if(lockFlag) {
                lock.unlock();
            }
            removeThreadLocal();
        }
	}

	@Override
	public Mono<GlobalResponse<UploadResponseDto>> postUpload(MultipartFile file, PostUploadRequestDto dto) {
		String hashRedisKey = RedisKeyUtils.getBigFileInfoHashKey(dto);
        Map<String, String> hashValues = RedisUtils.HashOps.getAll(hashRedisKey);
        
        String md5 = dto.getMd5();
        
        log.info("begin to check upload data， md5:{}", md5);
        ErrorConstants checkResult = checkUpload(file, dto, hashValues);
        if(!ErrorConstants.SUCCESS.equals(checkResult)) {
            log.error("check upload fail, result is:{}", checkResult.getDesc());
            return Mono.just(GlobalResponse.fail(checkResult));
        }
        
        long sliceSize = getSliceSizeFromMap(hashValues);
        int sliceIndex = getSliceIndex(dto.getFileStart(), sliceSize);
        
        String waitRedisKey = RedisKeyUtils.getWaitSliceKey(dto);
        Double score = RedisUtils.ZSetOps.score(waitRedisKey, String.valueOf(sliceIndex));
        if(null == score) {
            log.warn("Another person is uploading， MD5 is :{}, slice index is:{}, return next slice to upload", dto.getMd5(), sliceIndex);
            return Mono.just(getNextSliceInfo(waitRedisKey, hashRedisKey, hashValues, sliceIndex));
        }
        
        log.info("check success, begin process upload");
        
        RLock sliceLock = redissonClient.getLock(RedisKeyUtils.getSliceLock(dto, sliceIndex));
        boolean lockFlag = false;
        try {
            lockFlag = sliceLock.tryLock(1, bigfileConfig.getSliceLockTimeOutSecond(), TimeUnit.SECONDS);
            if(!lockFlag) {
                log.warn("Another person is uploading， MD5 is :{}, slice index is:{}", dto.getMd5(), sliceIndex);
                return Mono.just(getNextSliceInfo(waitRedisKey, hashRedisKey, hashValues, sliceIndex));
            }
            // double check
            score = RedisUtils.ZSetOps.score(waitRedisKey, String.valueOf(sliceIndex));
            if(null == score) {
                log.warn("Another person is upload end， MD5 is :{}, slice index is:{}, return next slice to upload", dto.getMd5(), sliceIndex);
                return Mono.just(getNextSliceInfo(waitRedisKey, hashRedisKey, hashValues, sliceIndex));
            }
            
            setThreadLocal(dto);
            
            Long totalSlice = getTotalSliceFromMap(hashValues);
            String uploadId = getUploadIdFromMap(hashValues);
            //  file path
            String realFilePath = FileUtils.getRealFilePath(md5);
            // upload temp file/only one slice file to s3/oss
            String eTag = uploadFile(file, realFilePath, totalSlice, uploadId, sliceIndex);
            if(StringUtils.isBlank(eTag)) {
                return Mono.just(GlobalResponse.fail(ErrorConstants.UPLOAD_ERROR));
            }
            
            if(1 == totalSlice) {
                // only one slice
                return Mono.just(processOnlyOneSlice(hashRedisKey, waitRedisKey, realFilePath, sliceIndex));
            }
            
            // add to redis uploaded eTag set
            String endRedisKey = RedisKeyUtils.getEndSliceKey(dto);
            
            RedisUtils.getRedisTemplate().executePipelined(new RedisCallback<String>() {
                @Override
                public String doInRedis(RedisConnection connection) throws DataAccessException {
                    connection.multi();
                    connection.zAdd(endRedisKey.getBytes(), sliceIndex, eTag.getBytes());
                    connection.expire(endRedisKey.getBytes(), bigfileConfig.getRedisTimeOutSecond());
                    connection.exec();
                    return null;
                }});
            
            long waitSize = RedisUtils.ZSetOps.size(waitRedisKey);
            if(1 == waitSize) {
            	// last one part
                return Mono.just(lastPartUpload(hashValues, endRedisKey, uploadId, hashRedisKey, waitRedisKey, sliceIndex));
            }
            // remove redis wait set
            RedisUtils.ZSetOps.remove(waitRedisKey, String.valueOf(sliceIndex));
            return Mono.just(getNextSliceInfo(waitRedisKey, hashRedisKey, hashValues, sliceIndex));
        } catch (InterruptedException e) {
            log.warn("redis lock is interrupted", e);
            return Mono.just(GlobalResponse.fail(ErrorConstants.UPLOAD_ERROR));
        } finally {
            if(lockFlag) {
                sliceLock.unlock();
            }
            removeThreadLocal();
        }
	}
	
	/**
	 * 设置ThreadLocal信息
	 */
	public abstract void setThreadLocal(BaseDto dto);
	
	/**
	 * 删除ThreadLocal
	 */
	public abstract void removeThreadLocal();
	
	/**
	 * 初始化上传
	 * @param realFileKey
	 * @return 获取分片上传的全局uploadId
	 */
	public abstract String initUpload(String realFileKey);
	
	/**
	 * 当文件过小时，不进行分片，直接上传
	 * @param file
	 * @param realFileKey
	 * @return eTag
	 */
	public abstract String uploadRealFile(MultipartFile file, String realFileKey);
	
	/**
	 * 上传part文件
	 * @param file
	 * @param realFileKey
	 * @param uploadId
	 * @param sliceIndex
	 * @return
	 */
	public abstract String uploadPartFile(MultipartFile file, String realFileKey, String uploadId, int sliceIndex);

	/**
	 * 所有分片上传完毕，进行合并处理
	 * @param uploadId
	 * @param realFileKey
	 * @param endETags
	 * @return eTag
	 */
	public abstract String completedUpload(String uploadId, String realFileKey, Set<String> endETags);
	
	/**
	 * 检查上传信息是否合法
	 * @param file
	 * @param dto
	 * @param hashValues
	 * @return
	 */
	private ErrorConstants checkUpload(MultipartFile file, PostUploadRequestDto dto, Map<String, String> hashValues) {
        if (null==hashValues || hashValues.isEmpty()) {
            log.error("redis key is not exists");
            return ErrorConstants.REDIS_KEY_NOT_EXISTS_ERROR;
        }
        if(dto.getFileEnd().compareTo(dto.getFileStart()) < 0) {
            log.error("fileEnd must more than the fileBegin");
            return ErrorConstants.FILE_END_MUST_MORE_THEN_BEGIN_ERROR;
        }
        
        long sliceSize = getSliceSizeFromMap(hashValues);
        if(dto.getFileStart() % sliceSize != 0) {
            log.error("fileStart is error");
            return ErrorConstants.FILE_BEGIN_ERROR;
        }
        
        long size = dto.getFileEnd()-dto.getFileStart();
        if(size > sliceSize) {
            log.error("fileEnd is error");
            return ErrorConstants.FILE_END_ERROR;
        }
        
        long fileSize = file.getSize();
        if(fileSize != size) {
            log.error("file size  is error");
            return ErrorConstants.FILE_SIZE_ERROR;
        }
        return ErrorConstants.SUCCESS;
    }
    
	/**
	 * 计算分片索引
	 * @param fileStart
	 * @param sliceSize
	 * @return
	 */
    private int getSliceIndex(Long fileStart, Long sliceSize) {
        return new Double(fileStart/sliceSize).intValue();
    }
    
    /**
     *  获取下一个需要上传的分片信息
     * @param waitRedisKey
     * @param hashRedisKey
     * @param hashValues
     * @param cuurentSliceIndex
     * @return
     */
    private GlobalResponse<UploadResponseDto> getNextSliceInfo(String waitRedisKey, String hashRedisKey,  Map<String, String> hashValues, int cuurentSliceIndex) {
        String sliceIndex = RedisUtils.ZSetOps.getNextValue(waitRedisKey, cuurentSliceIndex);
        if(StringUtils.isBlank(sliceIndex)) {
            String s3File = RedisUtils.HashOps.get(hashRedisKey, RedisKeyUtils.PATH);
            int count = 0;
            int maxCount = bigfileConfig.getSliceLockTimeOutSecond();
            // max wait 300 Second
            while (StringUtils.isBlank(s3File) && count<maxCount) {
                try {
                    Thread.sleep(1000L);
                    log.info("wait file upload end {} second, redis key:{}", count+1, hashRedisKey);
                } catch (InterruptedException e) {
                }
                s3File = RedisUtils.HashOps.get(hashRedisKey, RedisKeyUtils.PATH);
                count++;
            }
            if(StringUtils.isBlank(s3File)) {
                return GlobalResponse.fail(ErrorConstants.SOMEONE_IS_UPLOADING_SLICE_ERROR);
            }
            return GlobalResponse.success(UploadResponseDto.builder().completionRatio(1f).s3FilePath(s3File).build());
        }
        return getSliceInfo(hashValues, Integer.valueOf(sliceIndex), waitRedisKey);
    }

    /**
     * 上传文件
     * @param file
     * @param realFilePath
     * @param totalSlice
     * @param uploadId
     * @param sliceIndex
     * @return
     */
    private String uploadFile(MultipartFile file, String realFilePath, Long totalSlice, String uploadId, int sliceIndex) {
        if(1 == totalSlice) {
            return uploadRealFile(file, realFilePath);
        }
        return uploadPartFile(file, realFilePath, uploadId, sliceIndex);
    }
    
    private GlobalResponse<UploadResponseDto> processOnlyOneSlice(String hashRedisKey, String waitRedisKey, String s3File, Integer sliceIndex) {
        RedisUtils.getRedisTemplate().executePipelined(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                connection.multi();
                connection.hSet(hashRedisKey.getBytes(), RedisKeyUtils.PATH.getBytes(), s3File.getBytes());
                connection.zRem(waitRedisKey.getBytes(), String.valueOf(sliceIndex).getBytes());
                connection.exec();
                return null;
            }});
        UploadResponseDto respDto = UploadResponseDto.builder()
            .completionRatio(1f)
            .s3FilePath(s3File)
            .build();
        return GlobalResponse.success(respDto);
    }
    
    /**
     * 最后一个分片上传
     * @param hashValues
     * @param endRedisKey
     * @param uploadId
     * @param hashRedisKey
     * @param waitRedisKey
     * @param sliceIndex
     * @return
     */
    private GlobalResponse<UploadResponseDto> lastPartUpload(Map<String, String> hashValues, String endRedisKey, String uploadId, String hashRedisKey, String waitRedisKey, int sliceIndex) {
        String realFile = FileUtils.getRealFilePath(getMd5FromMap(hashValues));
        // the file upload end
        Set<String> endETags = RedisUtils.ZSetOps.getAll(endRedisKey);
        
        try {
            String eTag = completedUpload(uploadId, realFile, endETags);
            if(StringUtils.isBlank(eTag)) {
                return GlobalResponse.fail(ErrorConstants.COMPLETE_ERROR);
            }
        } catch (Exception e) {
            log.error("s3 complete file error", e);
            return GlobalResponse.fail(ErrorConstants.COMPLETE_ERROR);
        }
        
        RedisUtils.getRedisTemplate().executePipelined(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                connection.multi();
                connection.hSet(hashRedisKey.getBytes(), RedisKeyUtils.PATH.getBytes(), realFile.getBytes());
                // 删除中间状态redis
                connection.del(waitRedisKey.getBytes(), endRedisKey.getBytes());
                connection.exec();
                return null;
            }});
        return GlobalResponse.success(UploadResponseDto.builder().completionRatio(1f).s3FilePath(realFile).build());
    }
    
    /**
     * pre阶段的第一次init
     * @param dto
     * @param hashRedisKey
     * @return
     */
    private GlobalResponse<UploadResponseDto> initPreUpload(PreUploadRequestDto dto, String hashRedisKey) {
        Long sliceSize = bigfileConfig.getSliceSize();
        int totalSlice = SliceUtils.getTotalSlice(sliceSize, dto.getTotalSize());
        
        String uploadId = null;
        if(totalSlice > 1) {
            try {
                String realFileKey = FileUtils.getRealFilePath(dto.getMd5());
                uploadId = initUpload(realFileKey);
                if(StringUtils.isBlank(uploadId)) {
                    return GlobalResponse.fail(ErrorConstants.INIT_UPLOAD_ERROR);
                }
            } catch (Exception e) {
                log.error("init s3 upload error", e);
                return GlobalResponse.fail(ErrorConstants.INIT_UPLOAD_ERROR);
            }
        }
        
        Long[] split = SliceUtils.getSliceStartAndEnd(sliceSize, dto.getTotalSize(), 0);
        
        Set<RedisZSetCommands.Tuple> tuples = new HashSet<>(totalSlice);
        for(int i=0; i<totalSlice; i++) {
            RedisZSetCommands.Tuple val = new DefaultTuple(String.valueOf(i).getBytes(), Double.valueOf(i));
            tuples.add(val);
        }
        
        Map<byte[], byte[]> map = new HashMap<>();
        map.put(RedisKeyUtils.FILE_NAME.getBytes(), dto.getFileName().getBytes());
        map.put(RedisKeyUtils.BEGIN_DATE.getBytes(), DateUtils.formatDate(new Date(), DateUtils.Pattern.Date.YYYYMMDD).getBytes());
        map.put(RedisKeyUtils.SLICE_SIZE.getBytes(), String.valueOf(sliceSize).getBytes());
        map.put(RedisKeyUtils.TOTAL_SIZE.getBytes(), String.valueOf(dto.getTotalSize()).getBytes());
        map.put(RedisKeyUtils.TOTAL_SLICE.getBytes(), String.valueOf(totalSlice).getBytes());
        map.put(RedisKeyUtils.MD5.getBytes(), dto.getMd5().getBytes());
        if(totalSlice > 1) {
            map.put(RedisKeyUtils.UPLOAD_ID.getBytes(), uploadId.getBytes());
        }
        
        String waitRedisKey = RedisKeyUtils.getWaitSliceKey(dto);
        
        RedisUtils.getRedisTemplate().executePipelined(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                connection.multi();
                connection.zAdd(waitRedisKey.getBytes(), tuples);
                connection.expire(waitRedisKey.getBytes(), bigfileConfig.getRedisTimeOutSecond());
                
                connection.hashCommands().hMSet(hashRedisKey.getBytes(), map);
                connection.expire(hashRedisKey.getBytes(), bigfileConfig.getRedisTimeOutSecond());
                connection.exec();
                return null;
            }});
        
        UploadResponseDto respDto = UploadResponseDto.builder().fileStart(split[0]).fileEnd(split[1]).sliceSize(bigfileConfig.getSliceSize()).completionRatio(0f).build();
        return GlobalResponse.success(respDto);
    }
    
    /**
     * pre阶段已经初始化后在init逻辑
     * @param dto
     * @param hashRedisKey
     * @return
     */
    private GlobalResponse<UploadResponseDto> secondPreUpload(PreUploadRequestDto dto, String hashRedisKey) {
        Map<String, String> values = RedisUtils.HashOps.getAll(hashRedisKey);
        String s3FilePath = getRealFileFromMap(values);
        
        // upload is end
        if(StringUtils.isNotBlank(s3FilePath)) {
            return GlobalResponse.success(UploadResponseDto.builder().completionRatio(1f).s3FilePath(s3FilePath).build());
        }
        
        String waitRedisKey = RedisKeyUtils.getWaitSliceKey(dto);
        String sliceIndex = RedisUtils.ZSetOps.getFirst(waitRedisKey);
        if(Objects.isNull(sliceIndex)) {
            log.warn("the last slice is uploading by someone, md5:[{}]", dto.getMd5());
            return GlobalResponse.fail(ErrorConstants.SOMEONE_IS_UPLOADING_SLICE_ERROR);
        }

        Integer cuurentSlice = Integer.valueOf(sliceIndex);
        
        return getSliceInfo(values, cuurentSlice, waitRedisKey);
    }
    
    /**
     * 获取已经init好的分片信息
     * @param values
     * @param cuurentSlice
     * @param waitRedisKey
     * @return
     */
    private GlobalResponse<UploadResponseDto> getSliceInfo(Map<String, String> values, Integer cuurentSlice, String waitRedisKey) {
        Long totalSize = getTotalSizeFromMap(values);
        Long sliceSize = getSliceSizeFromMap(values);
        Long totalSlice = getTotalSliceFromMap(values);
        Long waitSlice = RedisUtils.ZSetOps.size(waitRedisKey);
        
        Long[] split = SliceUtils.getSliceStartAndEnd(sliceSize, totalSize, cuurentSlice);
        float completionRatio = SliceUtils.getCompletionRatio(totalSlice, waitSlice);
        
        UploadResponseDto respDto = UploadResponseDto.builder().fileStart(split[0]).fileEnd(split[1]).sliceSize(sliceSize).completionRatio(completionRatio).build();
        return GlobalResponse.success(respDto);
    }
    
    /**
     * 总分片数
     * @param hashValues
     * @return
     */
    private Long getTotalSliceFromMap(Map<String, String> hashValues) {
        return Long.valueOf(hashValues.get(RedisKeyUtils.TOTAL_SLICE));
    }
    
    /**
     * 文件总大小
     * @param hashValues
     * @return
     */
    private Long getTotalSizeFromMap(Map<String, String> hashValues) {
        return Long.valueOf(hashValues.get(RedisKeyUtils.TOTAL_SIZE));
    }
    
    /**
     * 单个分片的分片标准(分片大小)
     * @param hashValues
     * @return
     */
    private Long getSliceSizeFromMap(Map<String, String> hashValues) {
        return Long.valueOf(hashValues.get(RedisKeyUtils.SLICE_SIZE));
    }
    
    /**
     * 文件的MD5值
     * @param hashValues
     * @return
     */
    private String getMd5FromMap(Map<String, String> hashValues) {
        return hashValues.get(RedisKeyUtils.MD5);
    }
    
    /**
     * 云存储上的文件key
     * @param hashValues
     * @return
     */
    private String getRealFileFromMap(Map<String, String> hashValues) {
        return hashValues.get(RedisKeyUtils.PATH);
    }
    
    /**
     * 云存储的uploadId
     * @param hashValues
     * @return
     */
    private String getUploadIdFromMap(Map<String, String> hashValues) {
        return hashValues.get(RedisKeyUtils.UPLOAD_ID);
    }
}
